package com.xkcoding.mq.rabbitmq.receiver;




import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.transaction.annotation.Transactional;

import java.util.concurrent.*;

/**
 * [Description]: 接口调用结果执行
 *
 * @author : xh
 * @date : 2021-08-12 16:56
 */
@Slf4j
public class ApiTaskExecute {
    /**
     * 最大线程数  注意这里要根据不同的数据方进行调整
     */
    private static final int THREAD_POOL_SIZE = 10;

    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
        THREAD_POOL_SIZE,
        THREAD_POOL_SIZE,
        0L,
        TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>(),
        Executors.defaultThreadFactory(),
        //注意这里使用的事默认拒绝策略  会抛出异常
        new ThreadPoolExecutor.CallerRunsPolicy());

    /**
     * 功能描述:获取数据并进行拼装 <br>
     * 〈〉
     *
     * @Param: [groupById]
     * @Return: java.util.List<com.hare.zebra.util.internal.ApiTask>
     * @Author: xh
     * @Date: 2021/8/26 15:32
     */


    /**
     * @param consumer 报告分组id
     * @return
     * @Author xh
     * @Date 2021/8/12
     * @Time 17:01
     **/
    @Transactional(rollbackFor = Exception.class)
    public void execute(String consumer) {
        //传入线程里的入参
        //创建线程池，其中任务队列需要结合实际情况设置合理的容量
        //正常情况 最大线程数 应该为 cpu的密集型  可以通过 Runtime.getRuntime().availableProcessors() 获取核心数
        //提交
        ApiTask apiTask = new ApiTask().setNumber(consumer);
        //传入线程入参 ,然后把每条线程执行的结果用集合封装起来,
        Future<String> submit = executor.submit(apiTask);
        //shutdown的原理是只是将线程池的状态设置为SHUTDOWN状态，然后中断空闲的线程。
        executor.shutdown();
        try {
            if (!executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)) {
                Thread.currentThread().interrupt();
            }
        } catch (Exception e) {
            //停止正在执行的任务  shutdownNow的原理是遍历线程池中的工作线程，然后逐个调用线程的interrupt方法来中断线程，
            executor.shutdownNow();
            //通知现在停止
            Thread.currentThread().interrupt();
            //todo 可以发送一个邮件
            log.error("执行器等待关闭错误:{}", e.getMessage());
            //直接抛出异常 放到mq的顶部重新执行
            throw new RuntimeException("执行器等待关闭错误");
        } finally {
            //异步获取结果
                try {
                    //直接调用 Future 的函数式接口 的get方法 就可以获取到结果集
                  String  result = submit.get();
                    System.out.println("线程池结果:"+result);
                } catch (Exception ex) {
                    log.error("【获取线程结果错误】:{}", ex.getMessage(), ex);
                }
        }
    }

}

@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)
@Data
@Slf4j
class ApiTask implements Callable<String> {


    private String number;

    @Override
    public String call() {
        String name = Thread.currentThread().getName();
        log.info("线程开始执行:{}，是否通知结束{}", name, Thread.currentThread().isInterrupted());
        //防止通知停止   如果此线程已被中断，则为true ； 否则为false
        if (!Thread.currentThread().isInterrupted()) {
            return this.number;
        } else {
            log.error("xxxxxxxxxxxxxxxxxx此线程已被中断");
            return null;
        }
    }
}
